import { SpanKind } from "@opentelemetry/api";
import type { SemConvAttributes } from "langwatch/observability";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { EventSourcedQueueDefinition } from "../../../library/queues";

const mockLogger = {
  info: vi.fn(),
  error: vi.fn(),
  debug: vi.fn(),
};

const mockTracer = {
  withActiveSpan: vi.fn((name, options, fn) => fn()),
};

vi.mock("../../../../utils/logger", () => ({
  createLogger: vi.fn(() => mockLogger),
}));

vi.mock("langwatch", () => ({
  getLangWatchTracer: vi.fn(() => mockTracer),
}));

import { EventSourcedQueueProcessorMemory } from "../memory";

describe("EventSourcedQueueProcessorMemory", () => {
  beforeEach(() => {
    vi.clearAllMocks();
  });

  afterEach(() => {
    vi.clearAllMocks();
  });

  describe("send", () => {
    it("immediately processes payload through process function", async () => {
      const processFn = vi.fn().mockResolvedValue(void 0);
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);
      await processor.send("test-payload");

      expect(processFn).toHaveBeenCalledWith("test-payload");
    });

    it("creates tracing spans when processing", async () => {
      const processFn = vi.fn().mockResolvedValue(void 0);
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);
      await processor.send("test-payload");

      expect(mockTracer.withActiveSpan).toHaveBeenCalledWith(
        "pipeline.process",
        expect.objectContaining({
          kind: SpanKind.INTERNAL,
        }),
        expect.any(Function),
      );
      expect(processFn).toHaveBeenCalledWith("test-payload");
    });

    it("includes custom span attributes from callback", async () => {
      const processFn = vi.fn().mockResolvedValue(void 0);
      const spanAttributes = vi.fn(
        (payload: string): SemConvAttributes => ({
          "custom.attr": payload.length,
        }),
      );
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
        spanAttributes,
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);
      await processor.send("test-payload");

      expect(spanAttributes).toHaveBeenCalledWith("test-payload");
      expect(mockTracer.withActiveSpan).toHaveBeenCalled();
    });

    it("propagates errors from process function", async () => {
      const error = new Error("Processing error");
      const processFn = vi.fn().mockRejectedValue(error);
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);

      await expect(processor.send("test-payload")).rejects.toThrow(
        "Processing error",
      );
      expect(processFn).toHaveBeenCalledWith("test-payload");
    });

    it("awaits processing completion before returning", async () => {
      let resolveProcess: () => void;
      const processPromise = new Promise<void>((resolve) => {
        resolveProcess = resolve;
      });
      const processFn = vi.fn().mockReturnValue(processPromise);
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);
      const sendPromise = processor.send("test-payload");

      expect(processFn).toHaveBeenCalledWith("test-payload");
      expect(mockTracer.withActiveSpan).toHaveBeenCalled();

      resolveProcess!();
      await sendPromise;

      expect(processFn).toHaveBeenCalledTimes(1);
    });

    it("handles concurrent send calls", async () => {
      const processFn = vi.fn().mockResolvedValue(void 0);
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);

      await Promise.all([
        processor.send("payload-1"),
        processor.send("payload-2"),
        processor.send("payload-3"),
      ]);

      expect(processFn).toHaveBeenCalledTimes(3);
      expect(processFn).toHaveBeenCalledWith("payload-1");
      expect(processFn).toHaveBeenCalledWith("payload-2");
      expect(processFn).toHaveBeenCalledWith("payload-3");
    });

    it("silently ignores unsupported options (makeJobId, delay, concurrency)", async () => {
      const processFn = vi.fn().mockResolvedValue(void 0);
      const makeJobId = vi.fn((payload: string) => `job-${payload}`);
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
        makeJobId,
        delay: 10, // Small delay to test it works without timing out
        options: { concurrency: 1 },
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);
      const sendPromise = processor.send("test-payload");

      // Wait for async processing to complete (including delay)
      await sendPromise;

      // Processor works normally - options are accepted but may not all be fully implemented
      expect(processFn).toHaveBeenCalledWith("test-payload");
      // makeJobId is used internally for deduplication, but we can verify the processor works
    });
  });

  describe("close", () => {
    it("completes without errors", async () => {
      const processFn = vi.fn().mockResolvedValue(void 0);
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);
      await processor.close();
    });

    it("can be called multiple times safely", async () => {
      const processFn = vi.fn().mockResolvedValue(void 0);
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);
      await processor.close();
      await processor.close();
      await processor.close();
    });

    it("allows send after close (memory implementation has no state)", async () => {
      const processFn = vi.fn().mockResolvedValue(void 0);
      const definition: EventSourcedQueueDefinition<string> = {
        name: "test-queue",
        process: processFn,
      };

      const processor = new EventSourcedQueueProcessorMemory(definition);
      await processor.close();
      await processor.send("test-payload");

      expect(processFn).toHaveBeenCalledWith("test-payload");
    });
  });
});
